import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import java.util.ArrayList;
//import org.apache.log4j.{Level, Logger};


/**
 * Summary:
 *     Aggregate: min()、minBy()、max()、maxBy() 滚动聚合并输出每次滚动聚合后的结果
 */
public class Aggregations
{
//    Logger.getRootLogger.setLevel(Level.ERROR)
//    Logger.rootLogger=info;
//    Logger.level
    public static void main(String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。
        ArrayList<UserActionLogPOJO> userActionLogs = new ArrayList<>();
        UserActionLogPOJO userActionLog1 = new UserActionLogPOJO("userID1","productID3",10);

        userActionLogs.add(userActionLog1);

        UserActionLogPOJO userActionLog2 = new UserActionLogPOJO("userID2","productID4",20);
//        userActionLog2.setUserID("userID2");
//        userActionLog2.setProductPrice(10);

        userActionLogs.add(userActionLog2);

        UserActionLogPOJO userActionLog3 = new UserActionLogPOJO("userID1","productID5",30);
//        userActionLog3.setUserID("userID1");
//        userActionLog3.setProductID("productID5");
//        userActionLog3.setProductPrice(30);

        userActionLogs.add(userActionLog3);

        DataStreamSource<UserActionLogPOJO> source = env.fromCollection(userActionLogs);

        // 转换: KeyBy对数据重分区
        // 这里, UserActionLog是POJO类型,也可通过keyBy("userID")进行分区
        KeyedStream<UserActionLogPOJO, String> keyedStream = source.keyBy(new KeySelector<UserActionLogPOJO, String>() {
            @Override
            public String getKey(UserActionLogPOJO value) throws Exception
            {
                return value.getUserID();
            }
        });
//        keyedStream.print();

        // 转换: Aggregate并输出
        // 滚动求和并输出
        keyedStream.sum("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.max("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.maxBy("productPrice").print();
        // 滚动求最小值并输出
//        keyedStream.min("productPrice").print();
        // 滚动求最小值并输出
        //keyedStream.minBy("productPrice").print();

        env.execute();
    }
}

//这里注意哈，流数据的最大值和批数据的最大值不是一个意思．
//流数据的最大值指的是一边滚动一边求最大值的．所以会输出一个最大值序列，而不是只有一个数值